今天我們來看看 webSocket("/echo")
裡面所定義的行為
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
每段分別是什麼意思。
我們先來看看 send
/**
* Enqueues a text frame for sending with the specified [content].
*
* May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
*/
public suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content))
這邊的 Frame
是
/**
* A frame received or ready to be sent. It is not reusable and not thread-safe
* @property fin is it final fragment, should be always `true` for control frames and if no fragmentation is used
* @property frameType enum value
* @property data - a frame content or fragment content
* @property disposableHandle could be invoked when the frame is processed
*/
public actual sealed class Frame actual constructor(
public actual val fin: Boolean,
public actual val frameType: FrameType,
public actual val data: ByteArray,
public actual val disposableHandle: DisposableHandle,
public actual val rsv1: Boolean,
public actual val rsv2: Boolean,
public actual val rsv3: Boolean
)
Frame.Text(content)
則是
public actual constructor(text: String) : this(true, text.toByteArray())
WebSocketSession.send(frame: Frame)
的實作則是
/**
* Enqueue a frame, may suspend if an outgoing queue is full. May throw an exception if the
* outgoing channel is already closed, so it is impossible to transfer any message.
* Frames that were sent after close frame could be silently ignored.
* Note that a close frame could be sent automatically in reply to a peer's close frame unless it is
* raw WebSocket session.
*/
public suspend fun send(frame: Frame) {
outgoing.send(frame)
}
這邊的 outgoing
則是
/**
* An outgoing frames channel. It could have limited capacity so sending too many frames may lead to suspension at
* corresponding send invocations. It also may suspend if a peer doesn't read frames for some reason.
*/
public val outgoing: SendChannel<Frame>
看完這段,我們就知道了,send("Please enter your name")
這段會將文字包成一個 Frame
物件,送到 SendChannel
裡面。
至於什麼是 Frame,定義的細節可以看 The websocket connection lifespan and frame structure.
或者 RFC 6455 5.2 Base Framing Protocol 的說明,今天我們先不挖到這麼底層的實作內容。
我們往下看 for (frame in incoming)
的內容,incoming
是
/**
* An incoming frames channel.
* Note that if you use `webSocket` to handle a WebSockets session,
* the incoming channel doesn't contain control frames such as the ping/pong or close frames.
* If you need control over control frames, use the `webSocketRaw` function.
*/
public val incoming: ReceiveChannel<Frame>
既然要能夠將 incoming
放在 for
裡面使用,ReceiveChannel
一定有實作 iterator
讓
/**
* Returns a new iterator to receive elements from this channel using a `for` loop.
* Iteration completes normally when the channel [is closed for `receive`][isClosedForReceive] without a cause and
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>
/**
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
* from concurrent coroutines.
*/
public interface ChannelIterator<out E>
這邊我們看過了怎麼讓 for (frame in incoming)
持續的取出 Frame
物件
接著 frame as? Frame.Text ?: continue
試著將 Frame
轉成 Frame.Text
/**
* Represents an application level text frame.
* In a RAW web socket session a big text frame could be fragmented
* (separated into several text frames so they have [fin] = false except the last one).
* Please note that a boundary between fragments could be in the middle of multi-byte (unicode) character
* so don't apply String constructor to every fragment but use decoder loop instead of concatenate fragments first.
* Note that usually there is no need to handle fragments unless you have a RAW web socket session.
*/
public actual class Text actual constructor(
fin: Boolean,
data: ByteArray,
rsv1: Boolean,
rsv2: Boolean,
rsv3: Boolean
) : Frame(fin, FrameType.TEXT, data, NonDisposableHandle, rsv1, rsv2, rsv3) {
public actual constructor(fin: Boolean, data: ByteArray) : this(fin, data, false, false, false)
public actual constructor(text: String) : this(true, text.toByteArray())
public actual constructor(fin: Boolean, packet: ByteReadPacket) : this(fin, packet.readBytes())
public constructor(fin: Boolean, buffer: ByteBuffer) : this(fin, buffer.moveToByteArray())
}
如果不能轉換成 Frame.Text
的話,continue
取得下一個 Frame
可以的話,就往下進行 val receivedText = frame.readText()
Frame.Text.readText()
則是
/**
* Reads text content from the text frame.
* Shouldn't be used for fragmented frames: such frames need to be reassembled first.
*/
public fun Frame.Text.readText(): String {
require(fin) { "Text could be only extracted from non-fragmented frame" }
return Charsets.UTF_8.newDecoder().decode(buildPacket { writeFully(data) })
}
這邊使用了 CharsetDecoder
將 data
從 ByteArray
轉換成 String
這邊使用了一個 Kotlin 開發時的小技巧 require
/**
* Throws an [IllegalArgumentException] with the result of calling [lazyMessage] if the [value] is false.
*
* @sample samples.misc.Preconditions.failRequireWithLazyMessage
*/
@kotlin.internal.InlineOnly
public inline fun require(value: Boolean, lazyMessage: () -> Any): Unit {
contract {
returns() implies value
}
if (!value) {
val message = lazyMessage()
throw IllegalArgumentException(message.toString())
}
}
這可以在進入函數之前先檢查,如果不符合條件拋出 IllegalArgumentException
收到了轉成 String
的 receivedText
,接著是
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
}
這邊的 CloseReason.Codes.NORMAL
是
public enum class Codes(public val code: Short) {
NORMAL(1000),
}
我們可以順便看看其他的 CloseReason.Codes
/**
* Standard close reason codes
*
* see https://tools.ietf.org/html/rfc6455#section-7.4 for list of codes
*/
@Suppress("KDocMissingDocumentation")
public enum class Codes(public val code: Short) {
NORMAL(1000),
GOING_AWAY(1001),
PROTOCOL_ERROR(1002),
CANNOT_ACCEPT(1003),
@InternalAPI
@Deprecated("This code MUST NOT be set as a status code in a Close control frame by an endpoint")
CLOSED_ABNORMALLY(1006),
NOT_CONSISTENT(1007),
VIOLATED_POLICY(1008),
TOO_BIG(1009),
NO_EXTENSION(1010),
INTERNAL_ERROR(1011),
SERVICE_RESTART(1012),
TRY_AGAIN_LATER(1013);
public companion object {
private val byCodeMap = values().associateBy { it.code }
@Deprecated(
"Use INTERNAL_ERROR instead.",
ReplaceWith(
"INTERNAL_ERROR",
"io.ktor.websocket.CloseReason.Codes.INTERNAL_ERROR"
)
)
@JvmField
@Suppress("UNUSED")
public val UNEXPECTED_CONDITION: Codes = INTERNAL_ERROR
/**
* Get enum value by close reason code
* @return enum instance or null if [code] is not in standard
*/
public fun byCode(code: Short): Codes? = byCodeMap[code]
}
}
一樣是依照 RFC 6455 定義出來的。
然後就可以建立出 CloseReason
物件
/**
* A WebSocket close reason.
* @property code - close reason code as per RFC 6455, recommended to be one of [CloseReason.Codes]
* @property message - a close reason message, could be empty
*/
public data class CloseReason(val code: Short, val message: String) {
public constructor(code: Codes, message: String) : this(code.code, message)
並且透過 close
送出關閉連線訊息
/**
* Sends a close frame with the specified [reason]. May suspend if the outgoing channel is full.
* The specified [reason] could be ignored if there was already
* close frame sent (for example in reply to a peer close frame). It also may do nothing when a session or an outgoing
* channel is already closed due to any reason.
*/
public suspend fun WebSocketSession.close(reason: CloseReason = CloseReason(CloseReason.Codes.NORMAL, "")) {
try {
send(Frame.Close(reason))
flush()
} catch (_: Throwable) {
}
}
這邊的 WebSocketSession.flush
定義是
/**
* Flushes all outstanding messages and suspends until all earlier sent messages will be written.
* Could be called at any time even after close. May return immediately if the connection is already terminated.
* However, it may also fail with an exception (or cancellation) at any point due to a session failure.
* Note that [flush] doesn't guarantee that frames were actually delivered.
*/
public suspend fun flush()
實作的部分,我們可以參考 RawWebSocketJvm.flush
override suspend fun flush(): Unit = writer.flush()
如果沒有 close
的話,就會持續的運作 send(Frame.Text("Hi, $receivedText!"))
到這邊,我們簡單的看過了
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
這一整段背後的實作。
明天我們來看看 Frame
是怎麼被架構出來的